home *** CD-ROM | disk | FTP | other *** search
/ Chip 2007 January, February, March & April / Chip-Cover-CD-2007-02.iso / Pakiet bezpieczenstwa / mini Pentoo LiveCD 2006.1 / mpentoo-2006.1.iso / livecd.squashfs / usr / lib / python2.4 / test / test_subprocess.pyc (.txt) < prev    next >
Python Compiled Bytecode  |  2005-10-18  |  19KB  |  522 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyc (Python 2.4)
  3.  
  4. import unittest
  5. from test import test_support
  6. import subprocess
  7. import sys
  8. import signal
  9. import os
  10. import tempfile
  11. import time
  12. import re
  13. mswindows = sys.platform == 'win32'
  14. if mswindows:
  15.     SETBINARY = 'import msvcrt; msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY);'
  16. else:
  17.     SETBINARY = ''
  18.  
  19. def remove_stderr_debug_decorations(stderr):
  20.     return re.sub('\\[\\d+ refs\\]\\r?\\n?$', '', stderr)
  21.  
  22.  
  23. class ProcessTestCase(unittest.TestCase):
  24.     
  25.     def mkstemp(self):
  26.         '''wrapper for mkstemp, calling mktemp if mkstemp is not available'''
  27.         if hasattr(tempfile, 'mkstemp'):
  28.             return tempfile.mkstemp()
  29.         else:
  30.             fname = tempfile.mktemp()
  31.             return (os.open(fname, os.O_RDWR | os.O_CREAT), fname)
  32.  
  33.     
  34.     def test_call_seq(self):
  35.         rc = subprocess.call([
  36.             sys.executable,
  37.             '-c',
  38.             'import sys; sys.exit(47)'])
  39.         self.assertEqual(rc, 47)
  40.  
  41.     
  42.     def test_call_kwargs(self):
  43.         newenv = os.environ.copy()
  44.         newenv['FRUIT'] = 'banana'
  45.         rc = subprocess.call([
  46.             sys.executable,
  47.             '-c',
  48.             'import sys, os;sys.exit(os.getenv("FRUIT")=="banana")'], env = newenv)
  49.         self.assertEqual(rc, 1)
  50.  
  51.     
  52.     def test_stdin_none(self):
  53.         p = subprocess.Popen([
  54.             sys.executable,
  55.             '-c',
  56.             'print "banana"'], stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  57.         p.wait()
  58.         self.assertEqual(p.stdin, None)
  59.  
  60.     
  61.     def test_stdout_none(self):
  62.         p = subprocess.Popen([
  63.             sys.executable,
  64.             '-c',
  65.             'print "    this bit of output is from a test of stdout in a different process ..."'], stdin = subprocess.PIPE, stderr = subprocess.PIPE)
  66.         p.wait()
  67.         self.assertEqual(p.stdout, None)
  68.  
  69.     
  70.     def test_stderr_none(self):
  71.         p = subprocess.Popen([
  72.             sys.executable,
  73.             '-c',
  74.             'print "banana"'], stdin = subprocess.PIPE, stdout = subprocess.PIPE)
  75.         p.wait()
  76.         self.assertEqual(p.stderr, None)
  77.  
  78.     
  79.     def test_executable(self):
  80.         p = subprocess.Popen([
  81.             'somethingyoudonthave',
  82.             '-c',
  83.             'import sys; sys.exit(47)'], executable = sys.executable)
  84.         p.wait()
  85.         self.assertEqual(p.returncode, 47)
  86.  
  87.     
  88.     def test_stdin_pipe(self):
  89.         p = subprocess.Popen([
  90.             sys.executable,
  91.             '-c',
  92.             'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin = subprocess.PIPE)
  93.         p.stdin.write('pear')
  94.         p.stdin.close()
  95.         p.wait()
  96.         self.assertEqual(p.returncode, 1)
  97.  
  98.     
  99.     def test_stdin_filedes(self):
  100.         tf = tempfile.TemporaryFile()
  101.         d = tf.fileno()
  102.         os.write(d, 'pear')
  103.         os.lseek(d, 0, 0)
  104.         p = subprocess.Popen([
  105.             sys.executable,
  106.             '-c',
  107.             'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin = d)
  108.         p.wait()
  109.         self.assertEqual(p.returncode, 1)
  110.  
  111.     
  112.     def test_stdin_fileobj(self):
  113.         tf = tempfile.TemporaryFile()
  114.         tf.write('pear')
  115.         tf.seek(0)
  116.         p = subprocess.Popen([
  117.             sys.executable,
  118.             '-c',
  119.             'import sys; sys.exit(sys.stdin.read() == "pear")'], stdin = tf)
  120.         p.wait()
  121.         self.assertEqual(p.returncode, 1)
  122.  
  123.     
  124.     def test_stdout_pipe(self):
  125.         p = subprocess.Popen([
  126.             sys.executable,
  127.             '-c',
  128.             'import sys; sys.stdout.write("orange")'], stdout = subprocess.PIPE)
  129.         self.assertEqual(p.stdout.read(), 'orange')
  130.  
  131.     
  132.     def test_stdout_filedes(self):
  133.         tf = tempfile.TemporaryFile()
  134.         d = tf.fileno()
  135.         p = subprocess.Popen([
  136.             sys.executable,
  137.             '-c',
  138.             'import sys; sys.stdout.write("orange")'], stdout = d)
  139.         p.wait()
  140.         os.lseek(d, 0, 0)
  141.         self.assertEqual(os.read(d, 1024), 'orange')
  142.  
  143.     
  144.     def test_stdout_fileobj(self):
  145.         tf = tempfile.TemporaryFile()
  146.         p = subprocess.Popen([
  147.             sys.executable,
  148.             '-c',
  149.             'import sys; sys.stdout.write("orange")'], stdout = tf)
  150.         p.wait()
  151.         tf.seek(0)
  152.         self.assertEqual(tf.read(), 'orange')
  153.  
  154.     
  155.     def test_stderr_pipe(self):
  156.         p = subprocess.Popen([
  157.             sys.executable,
  158.             '-c',
  159.             'import sys; sys.stderr.write("strawberry")'], stderr = subprocess.PIPE)
  160.         self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()), 'strawberry')
  161.  
  162.     
  163.     def test_stderr_filedes(self):
  164.         tf = tempfile.TemporaryFile()
  165.         d = tf.fileno()
  166.         p = subprocess.Popen([
  167.             sys.executable,
  168.             '-c',
  169.             'import sys; sys.stderr.write("strawberry")'], stderr = d)
  170.         p.wait()
  171.         os.lseek(d, 0, 0)
  172.         self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)), 'strawberry')
  173.  
  174.     
  175.     def test_stderr_fileobj(self):
  176.         tf = tempfile.TemporaryFile()
  177.         p = subprocess.Popen([
  178.             sys.executable,
  179.             '-c',
  180.             'import sys; sys.stderr.write("strawberry")'], stderr = tf)
  181.         p.wait()
  182.         tf.seek(0)
  183.         self.assertEqual(remove_stderr_debug_decorations(tf.read()), 'strawberry')
  184.  
  185.     
  186.     def test_stdout_stderr_pipe(self):
  187.         p = subprocess.Popen([
  188.             sys.executable,
  189.             '-c',
  190.             'import sys;sys.stdout.write("apple");sys.stdout.flush();sys.stderr.write("orange")'], stdout = subprocess.PIPE, stderr = subprocess.STDOUT)
  191.         output = p.stdout.read()
  192.         stripped = remove_stderr_debug_decorations(output)
  193.         self.assertEqual(stripped, 'appleorange')
  194.  
  195.     
  196.     def test_stdout_stderr_file(self):
  197.         tf = tempfile.TemporaryFile()
  198.         p = subprocess.Popen([
  199.             sys.executable,
  200.             '-c',
  201.             'import sys;sys.stdout.write("apple");sys.stdout.flush();sys.stderr.write("orange")'], stdout = tf, stderr = tf)
  202.         p.wait()
  203.         tf.seek(0)
  204.         output = tf.read()
  205.         stripped = remove_stderr_debug_decorations(output)
  206.         self.assertEqual(stripped, 'appleorange')
  207.  
  208.     
  209.     def test_cwd(self):
  210.         tmpdir = os.getenv('TEMP', '/tmp')
  211.         cwd = os.getcwd()
  212.         os.chdir(tmpdir)
  213.         tmpdir = os.getcwd()
  214.         os.chdir(cwd)
  215.         p = subprocess.Popen([
  216.             sys.executable,
  217.             '-c',
  218.             'import sys,os;sys.stdout.write(os.getcwd())'], stdout = subprocess.PIPE, cwd = tmpdir)
  219.         normcase = os.path.normcase
  220.         self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
  221.  
  222.     
  223.     def test_env(self):
  224.         newenv = os.environ.copy()
  225.         newenv['FRUIT'] = 'orange'
  226.         p = subprocess.Popen([
  227.             sys.executable,
  228.             '-c',
  229.             'import sys,os;sys.stdout.write(os.getenv("FRUIT"))'], stdout = subprocess.PIPE, env = newenv)
  230.         self.assertEqual(p.stdout.read(), 'orange')
  231.  
  232.     
  233.     def test_communicate(self):
  234.         p = subprocess.Popen([
  235.             sys.executable,
  236.             '-c',
  237.             'import sys,os;sys.stderr.write("pineapple");sys.stdout.write(sys.stdin.read())'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  238.         (stdout, stderr) = p.communicate('banana')
  239.         self.assertEqual(stdout, 'banana')
  240.         self.assertEqual(remove_stderr_debug_decorations(stderr), 'pineapple')
  241.  
  242.     
  243.     def test_communicate_returns(self):
  244.         p = subprocess.Popen([
  245.             sys.executable,
  246.             '-c',
  247.             'import sys; sys.exit(47)'])
  248.         (stdout, stderr) = p.communicate()
  249.         self.assertEqual(stdout, None)
  250.         self.assertEqual(stderr, None)
  251.  
  252.     
  253.     def test_communicate_pipe_buf(self):
  254.         (x, y) = os.pipe()
  255.         if mswindows:
  256.             pipe_buf = 512
  257.         else:
  258.             pipe_buf = os.fpathconf(x, 'PC_PIPE_BUF')
  259.         os.close(x)
  260.         os.close(y)
  261.         p = subprocess.Popen([
  262.             sys.executable,
  263.             '-c',
  264.             'import sys,os;sys.stdout.write(sys.stdin.read(47));sys.stderr.write("xyz"*%d);sys.stdout.write(sys.stdin.read())' % pipe_buf], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  265.         string_to_write = 'abc' * pipe_buf
  266.         (stdout, stderr) = p.communicate(string_to_write)
  267.         self.assertEqual(stdout, string_to_write)
  268.  
  269.     
  270.     def test_writes_before_communicate(self):
  271.         p = subprocess.Popen([
  272.             sys.executable,
  273.             '-c',
  274.             'import sys,os;sys.stdout.write(sys.stdin.read())'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  275.         p.stdin.write('banana')
  276.         (stdout, stderr) = p.communicate('split')
  277.         self.assertEqual(stdout, 'bananasplit')
  278.         self.assertEqual(remove_stderr_debug_decorations(stderr), '')
  279.  
  280.     
  281.     def test_universal_newlines(self):
  282.         p = subprocess.Popen([
  283.             sys.executable,
  284.             '-c',
  285.             'import sys,os;' + SETBINARY + 'sys.stdout.write("line1\\n");sys.stdout.flush();sys.stdout.write("line2\\r");sys.stdout.flush();sys.stdout.write("line3\\r\\n");sys.stdout.flush();sys.stdout.write("line4\\r");sys.stdout.flush();sys.stdout.write("\\nline5");sys.stdout.flush();sys.stdout.write("\\nline6");'], stdout = subprocess.PIPE, universal_newlines = 1)
  286.         stdout = p.stdout.read()
  287.         if hasattr(open, 'newlines'):
  288.             self.assertEqual(stdout, 'line1\nline2\nline3\nline4\nline5\nline6')
  289.         else:
  290.             self.assertEqual(stdout, 'line1\nline2\rline3\r\nline4\r\nline5\nline6')
  291.  
  292.     
  293.     def test_universal_newlines_communicate(self):
  294.         p = subprocess.Popen([
  295.             sys.executable,
  296.             '-c',
  297.             'import sys,os;' + SETBINARY + 'sys.stdout.write("line1\\n");sys.stdout.flush();sys.stdout.write("line2\\r");sys.stdout.flush();sys.stdout.write("line3\\r\\n");sys.stdout.flush();sys.stdout.write("line4\\r");sys.stdout.flush();sys.stdout.write("\\nline5");sys.stdout.flush();sys.stdout.write("\\nline6");'], stdout = subprocess.PIPE, stderr = subprocess.PIPE, universal_newlines = 1)
  298.         (stdout, stderr) = p.communicate()
  299.         if hasattr(open, 'newlines'):
  300.             self.assertEqual(stdout, 'line1\nline2\nline3\nline4\nline5\nline6')
  301.         else:
  302.             self.assertEqual(stdout, 'line1\nline2\rline3\r\nline4\r\nline5\nline6')
  303.  
  304.     
  305.     def test_no_leaking(self):
  306.         if test_support.is_resource_enabled('subprocess') and not mswindows:
  307.             max_handles = 1026
  308.         else:
  309.             max_handles = 65
  310.         for i in range(max_handles):
  311.             p = subprocess.Popen([
  312.                 sys.executable,
  313.                 '-c',
  314.                 'import sys;sys.stdout.write(sys.stdin.read())'], stdin = subprocess.PIPE, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
  315.             data = p.communicate('lime')[0]
  316.             self.assertEqual(data, 'lime')
  317.         
  318.  
  319.     
  320.     def test_list2cmdline(self):
  321.         self.assertEqual(subprocess.list2cmdline([
  322.             'a b c',
  323.             'd',
  324.             'e']), '"a b c" d e')
  325.         self.assertEqual(subprocess.list2cmdline([
  326.             'ab"c',
  327.             '\\',
  328.             'd']), 'ab\\"c \\ d')
  329.         self.assertEqual(subprocess.list2cmdline([
  330.             'a\\\\\\b',
  331.             'de fg',
  332.             'h']), 'a\\\\\\b "de fg" h')
  333.         self.assertEqual(subprocess.list2cmdline([
  334.             'a\\"b',
  335.             'c',
  336.             'd']), 'a\\\\\\"b c d')
  337.         self.assertEqual(subprocess.list2cmdline([
  338.             'a\\\\b c',
  339.             'd',
  340.             'e']), '"a\\\\b c" d e')
  341.         self.assertEqual(subprocess.list2cmdline([
  342.             'a\\\\b\\ c',
  343.             'd',
  344.             'e']), '"a\\\\b\\ c" d e')
  345.  
  346.     
  347.     def test_poll(self):
  348.         p = subprocess.Popen([
  349.             sys.executable,
  350.             '-c',
  351.             'import time; time.sleep(1)'])
  352.         count = 0
  353.         while p.poll() is None:
  354.             time.sleep(0.10000000000000001)
  355.             count += 1
  356.         self.assert_(count >= 2)
  357.         self.assertEqual(p.poll(), 0)
  358.  
  359.     
  360.     def test_wait(self):
  361.         p = subprocess.Popen([
  362.             sys.executable,
  363.             '-c',
  364.             'import time; time.sleep(2)'])
  365.         self.assertEqual(p.wait(), 0)
  366.         self.assertEqual(p.wait(), 0)
  367.  
  368.     
  369.     def test_invalid_bufsize(self):
  370.         
  371.         try:
  372.             subprocess.Popen([
  373.                 sys.executable,
  374.                 '-c',
  375.                 'pass'], 'orange')
  376.         except TypeError:
  377.             pass
  378.  
  379.         self.fail('Expected TypeError')
  380.  
  381.     if not mswindows:
  382.         
  383.         def test_exceptions(self):
  384.             
  385.             try:
  386.                 p = subprocess.Popen([
  387.                     sys.executable,
  388.                     '-c',
  389.                     ''], cwd = '/this/path/does/not/exist')
  390.             except OSError:
  391.                 e = None
  392.                 self.assertNotEqual(e.child_traceback.find('os.chdir'), -1)
  393.  
  394.             self.fail('Expected OSError')
  395.  
  396.         
  397.         def test_run_abort(self):
  398.             p = subprocess.Popen([
  399.                 sys.executable,
  400.                 '-c',
  401.                 'import os; os.abort()'])
  402.             p.wait()
  403.             self.assertEqual(-(p.returncode), signal.SIGABRT)
  404.  
  405.         
  406.         def test_preexec(self):
  407.             p = subprocess.Popen([
  408.                 sys.executable,
  409.                 '-c',
  410.                 'import sys,os;sys.stdout.write(os.getenv("FRUIT"))'], stdout = subprocess.PIPE, preexec_fn = (lambda : os.putenv('FRUIT', 'apple')))
  411.             self.assertEqual(p.stdout.read(), 'apple')
  412.  
  413.         
  414.         def test_args_string(self):
  415.             (f, fname) = self.mkstemp()
  416.             os.write(f, '#!/bin/sh\n')
  417.             os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
  418.             os.close(f)
  419.             os.chmod(fname, 448)
  420.             p = subprocess.Popen(fname)
  421.             p.wait()
  422.             os.remove(fname)
  423.             self.assertEqual(p.returncode, 47)
  424.  
  425.         
  426.         def test_invalid_args(self):
  427.             self.assertRaises(ValueError, subprocess.call, [
  428.                 sys.executable,
  429.                 '-c',
  430.                 'import sys; sys.exit(47)'], startupinfo = 47)
  431.             self.assertRaises(ValueError, subprocess.call, [
  432.                 sys.executable,
  433.                 '-c',
  434.                 'import sys; sys.exit(47)'], creationflags = 47)
  435.  
  436.         
  437.         def test_shell_sequence(self):
  438.             newenv = os.environ.copy()
  439.             newenv['FRUIT'] = 'apple'
  440.             p = subprocess.Popen([
  441.                 'echo $FRUIT'], shell = 1, stdout = subprocess.PIPE, env = newenv)
  442.             self.assertEqual(p.stdout.read().strip(), 'apple')
  443.  
  444.         
  445.         def test_shell_string(self):
  446.             newenv = os.environ.copy()
  447.             newenv['FRUIT'] = 'apple'
  448.             p = subprocess.Popen('echo $FRUIT', shell = 1, stdout = subprocess.PIPE, env = newenv)
  449.             self.assertEqual(p.stdout.read().strip(), 'apple')
  450.  
  451.         
  452.         def test_call_string(self):
  453.             (f, fname) = self.mkstemp()
  454.             os.write(f, '#!/bin/sh\n')
  455.             os.write(f, "exec %s -c 'import sys; sys.exit(47)'\n" % sys.executable)
  456.             os.close(f)
  457.             os.chmod(fname, 448)
  458.             rc = subprocess.call(fname)
  459.             os.remove(fname)
  460.             self.assertEqual(rc, 47)
  461.  
  462.     
  463.     if mswindows:
  464.         
  465.         def test_startupinfo(self):
  466.             STARTF_USESHOWWINDOW = 1
  467.             SW_MAXIMIZE = 3
  468.             startupinfo = subprocess.STARTUPINFO()
  469.             startupinfo.dwFlags = STARTF_USESHOWWINDOW
  470.             startupinfo.wShowWindow = SW_MAXIMIZE
  471.             subprocess.call([
  472.                 sys.executable,
  473.                 '-c',
  474.                 'import sys; sys.exit(0)'], startupinfo = startupinfo)
  475.  
  476.         
  477.         def test_creationflags(self):
  478.             CREATE_NEW_CONSOLE = 16
  479.             sys.stderr.write('    a DOS box should flash briefly ...\n')
  480.             subprocess.call(sys.executable + ' -c "import time; time.sleep(0.25)"', creationflags = CREATE_NEW_CONSOLE)
  481.  
  482.         
  483.         def test_invalid_args(self):
  484.             self.assertRaises(ValueError, subprocess.call, [
  485.                 sys.executable,
  486.                 '-c',
  487.                 'import sys; sys.exit(47)'], preexec_fn = (lambda : 1))
  488.             self.assertRaises(ValueError, subprocess.call, [
  489.                 sys.executable,
  490.                 '-c',
  491.                 'import sys; sys.exit(47)'], close_fds = True)
  492.  
  493.         
  494.         def test_shell_sequence(self):
  495.             newenv = os.environ.copy()
  496.             newenv['FRUIT'] = 'physalis'
  497.             p = subprocess.Popen([
  498.                 'set'], shell = 1, stdout = subprocess.PIPE, env = newenv)
  499.             self.assertNotEqual(p.stdout.read().find('physalis'), -1)
  500.  
  501.         
  502.         def test_shell_string(self):
  503.             newenv = os.environ.copy()
  504.             newenv['FRUIT'] = 'physalis'
  505.             p = subprocess.Popen('set', shell = 1, stdout = subprocess.PIPE, env = newenv)
  506.             self.assertNotEqual(p.stdout.read().find('physalis'), -1)
  507.  
  508.         
  509.         def test_call_string(self):
  510.             rc = subprocess.call(sys.executable + ' -c "import sys; sys.exit(47)"')
  511.             self.assertEqual(rc, 47)
  512.  
  513.     
  514.  
  515.  
  516. def test_main():
  517.     test_support.run_unittest(ProcessTestCase)
  518.  
  519. if __name__ == '__main__':
  520.     test_main()
  521.  
  522.